﻿using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Reactive.Linq;
using System.Reactive.Threading;
using System.Reactive.Concurrency;

namespace TenSecSum
{
    class Program
    {
        static void Main(string[] args)
        {
            TenSecSample();
        }

        static void TenSecSample()
        {
            Console.WriteLine("#TenSecSample");
            var subscriber = GetSource()
                // 別スレッドで値を発行するIObservableのシーケンスにする
                .ToObservable(Scheduler.NewThread)
                // 1秒間隔で10秒間値をためる
                .Window(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1))
                // 10秒ためた値の合計をとって
                .SelectMany(o => o.Sum())
                // 異常か正常か判断
                .Select(i => new { Value = i, Message = i < 100 ? "正常" : "異常" })
                // 結果を表示
                .Subscribe(v => 
                    Console.WriteLine("{0:HH:mm:ss.fff} {1}", DateTime.Now, v));

            // Enterを押すまで待機
            Console.ReadLine();
            // 購読解除
            subscriber.Dispose();
            Console.WriteLine("--------------------------------");
        }

        // ダミーデータ作成メソッド
        static IEnumerable<int> GetSource()
        {
            var r = new Random();
            while (true)
            {
                yield return r.Next(10);
                Thread.Sleep(r.Next(1000));
            }
        }
    }
}
